import { createPool } from 'generic-pool'
import { Pipeline, Redis } from 'ioredis'

import { PluginsServerConfig } from '../../types'
import { REDIS_SERVER_KIND, createRedis } from '../../utils/db/redis'
import { timeoutGuard } from '../../utils/db/utils'
import { logger } from '../../utils/logger'
import { captureException } from '../../utils/posthog'
import { defineLuaTokenBucket } from './redis-token-bucket.lua'

type WithCheckRateLimit<T, TV2> = {
    checkRateLimit: (key: string, now: number, cost: number, poolMax: number, fillRate: number, expiry: number) => T
    checkRateLimitV2: (key: string, now: number, cost: number, poolMax: number, fillRate: number, expiry: number) => TV2
}

export type RedisClientPipeline = Pipeline & WithCheckRateLimit<number, [number, number]>

export type RedisClient = Omit<Redis, 'pipeline'> &
    WithCheckRateLimit<Promise<number>, Promise<[number, number]>> & {
        pipeline: () => RedisClientPipeline
    }

export type RedisOptions = {
    name: string
    timeout?: number
    failOpen?: boolean
}

export type RedisV2 = {
    useClient: <T>(options: RedisOptions, callback: (client: RedisClient) => Promise<T>) => Promise<T | null>
    usePipeline: (
        options: RedisOptions,
        callback: (pipeline: RedisClientPipeline) => void
    ) => Promise<Array<[Error | null, any]> | null>
}

// NOTE: This is intended to replace the general redis client with a nicer wrapper for using the client safely with the acquire locking
export const createRedisV2Pool = (config: PluginsServerConfig, kind: REDIS_SERVER_KIND): RedisV2 => {
    const pool = createPool<RedisClient>(
        {
            create: async () => {
                const client = await createRedis(config, kind)

                defineLuaTokenBucket(client)

                return client as RedisClient
            },
            destroy: async (client) => {
                await client.quit()
            },
        },
        {
            min: config.REDIS_POOL_MIN_SIZE,
            max: config.REDIS_POOL_MAX_SIZE,
            autostart: true,
        }
    )

    const useClient: RedisV2['useClient'] = async (options, callback) => {
        const timeout = timeoutGuard(
            `Redis call ${options.name} delayed. Waiting over 30 seconds.`,
            undefined,
            options.timeout
        )
        const client = await pool.acquire()

        try {
            return await callback(client)
        } catch (e) {
            if (options.failOpen) {
                // We log the error and return null
                captureException(e)
                logger.error(`Redis call${options.name} failed`, e)
                return null
            }
            throw e
        } finally {
            await pool.release(client)
            clearTimeout(timeout)
        }
    }

    const usePipeline: RedisV2['usePipeline'] = async (options, callback) => {
        return useClient(options, async (client) => {
            const pipeline = client.pipeline()
            callback(pipeline)
            return pipeline.exec()
        })
    }

    return {
        useClient,
        usePipeline,
    }
}

export type RedisPipelineResults = [Error | null, any][]

export const getRedisPipelineResults = (
    res: RedisPipelineResults,
    index: number,
    numOperations: number
): RedisPipelineResults => {
    // pipeline results are just a big array of operation results so we need to slice out the correct parts
    return res.slice(index * numOperations, index * numOperations + numOperations)
}
